#!/usr/bin/python3
#
# test-decode - run the decoding toolchain against an RF sample
# Copyright (C) 2019-2022 Adam Sampson
# Copyright (C) 2022 Chad Page
#
# This file is part of ld-decode.
#
# test-decode is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

# This script is intended for short decodes during regression testing and
# debugging -- it won't work well for real-world decoding.

import argparse
import json
import numpy
import os
import subprocess
import sys

dry_run = False
source_dir = None
build_dir = None

def die(*args):
    """Print an error message and exit."""
    print(*args, file=sys.stderr)
    sys.exit(1)

def safe_unlink(filename):
    """Remove a file if it exists; if not, do nothing."""

    try:
        os.unlink(filename)
    except FileNotFoundError:
        pass

def clean(args, suffixes):
    """Remove output files, if they exist."""

    for suffix in suffixes:
        safe_unlink(args.output + suffix)

def run_command(cmd, **kwopts):
    """Run a command, as with subprocess.call.
    If it fails, exit with an error message."""

    print('\n>>>', ' '.join(cmd), file=sys.stderr)
    if dry_run:
        return

    # Flush both streams, in case we're in an environment where they're both buffered
    sys.stdout.flush()
    sys.stderr.flush()

    rc = subprocess.call(cmd, stderr=subprocess.STDOUT, **kwopts)
    if rc != 0:
        die(cmd[0], 'failed with exit code', rc)

def run_decode_command(cmd):
    """Run an ld-cut/ld-decode command, setting PATH so that helper programs
    can be found."""

    env = os.environ.copy()
    env['PATH'] = ':'.join([
        build_dir,
        build_dir + '/tools/ld-ldf-reader',
        source_dir,
        env['PATH'],
        ])
    run_command(cmd, env=env)

def run_ld_cut(args):
    """Run ld-cut."""

    cutfile = args.output + '.cut.ldf'
    safe_unlink(cutfile)

    cmd = [source_dir + '/ld-cut']
    if args.pal:
        cmd += ['--pal']
    if args.cutseek is not None:
        cmd += ['-S', str(args.cutseek)]
    elif args.cutstart is not None:
        cmd += ['-s', str(args.cutstart)]
    else:
        die(cmd[0], 'internal error: run_ld_cut() run without seek or start points')

    cmd += ['-l', str(args.cutlength)]
    cmd += [args.infile, cutfile]

    run_decode_command(cmd)

    # Use the output as the input for the rest of the decoding process
    args.infile = cutfile

def run_ld_decode(args):
    """Run ld-decode."""

    clean(args, ['.tbc', '.tbc.json', '.efm', '.pcm', '.ac3', '.ac3.log'])

    cmd = [source_dir + '/ld-decode']
    cmd += ['--ignoreleadout']
    if args.pal:
        cmd += ['--pal']
    if args.AC3:
        cmd += ['--AC3']

    cmd += [args.infile, args.output]

    run_decode_command(cmd)

    # If doing AC3, check that there are enough output samples
    if (args.expect_ac3_bytes is not None) and (not dry_run):
        ac3_file = args.output + '.ac3'
        if not os.path.exists(ac3_file):
            die(ac3_file, 'does not exist')
        ac3_bytes = os.stat(ac3_file).st_size
        if ac3_bytes < args.expect_ac3_bytes:
            die(ac3_file, 'contains', ac3_bytes,
                'samples; expected at least', args.ac3_bytes)

def run_ld_process_vbi(args):
    """Run ld-process-vbi."""

    clean(args, ['.tbc.json.bup'])

    cmd = [build_dir + '/tools/ld-process-vbi/ld-process-vbi']
    cmd += [args.output + '.tbc']
    run_command(cmd)

    if dry_run:
        return

    # Read the JSON output
    json_file = args.output + '.tbc.json'
    if not os.path.exists(json_file):
        die(json_file, 'does not exist')
    with open(json_file) as f:
        data = json.load(f)
    if "fields" not in data:
        die(json_file, 'does not contain fields')

    # Check black SNR
    if args.expect_bpsnr is not None:
        bpsnr = numpy.median([field["vitsMetrics"]["bPSNR"]
                              for field in data["fields"]
                              if ("vitsMetrics" in field)
                                 and ("bPSNR" in field["vitsMetrics"])])
        if args.expect_bpsnr > bpsnr:
            die(json_file, 'has median bPSNR', bpsnr, 'dB, expected',
                args.expect_bpsnr, 'dB')

    # Print VBI data (useful for finding --expect_vbi values)
    if args.print_vbi:
        for field in data["fields"]:
            print("VBI data:", field["vbi"]["vbiData"])

    # Check for a field with the expected VBI values
    if args.expect_vbi is not None:
        for field in data["fields"]:
            if ("vbi" not in field) or ("vbiData" not in field["vbi"]):
                pass
            elif field["vbi"]["vbiData"] == args.expect_vbi:
                break
        else:
            die(json_file, 'did not contain a field with VBI values',
                args.expect_vbi)

    # Print VITC data
    if args.print_vitc:
        for field in data["fields"]:
            if "vitc" in field:
                print("VITC data:", field["vitc"]["vitcData"])

    # Check for a field with the expected VITC values
    if args.expect_vitc is not None:
        for field in data["fields"]:
            if ("vitc" not in field) or ("vitcData" not in field["vitc"]):
                pass
            elif field["vitc"]["vitcData"] == args.expect_vitc:
                break
        else:
            die(json_file, 'did not contain a field with VITC values',
                args.expect_vitc)

def run_ld_export_metadata(args):
    """Run ld-export-metadata."""

    clean(args, ['.vits.csv', '.vbi.csv', '.ffmetadata'])

    cmd = [build_dir + '/tools/ld-export-metadata/ld-export-metadata']
    cmd += ['--vits-csv', args.output + '.vits.csv']
    cmd += ['--vbi-csv', args.output + '.vbi.csv']
    cmd += ['--ffmetadata', args.output + '.ffmetadata']
    cmd += [args.output + '.tbc.json']
    run_command(cmd)

def run_ld_process_efm(args):
    """Run ld-process-efm."""

    if args.no_efm:
        return

    clean(args, ['.digital.pcm'])
    efm_file = args.output + '.efm'
    pcm_file = args.output + '.digital.pcm'

    if not dry_run:
        # XXX If the input file is empty, ld-process-efm will show a dialogue;
        # detect this ourselves first
        if not os.path.exists(efm_file):
            die(efm_file, 'does not exist')
        if os.stat(efm_file).st_size == 0:
            die(efm_file, 'is empty')

    cmd = [build_dir + '/tools/ld-process-efm/ld-process-efm']
    cmd += [efm_file, pcm_file]
    run_command(cmd)

    # Check there are enough output samples
    if (args.expect_efm_samples is not None) and (not dry_run):
        if not os.path.exists(pcm_file):
            die(pcm_file, 'does not exist')
        pcm_samples = os.stat(pcm_file).st_size // (2 * 2)
        if pcm_samples < args.expect_efm_samples:
            die(pcm_file, 'contains', pcm_samples,
                'samples; expected at least', args.expect_efm_samples)

def run_ld_dropout_correct(args):
    """Run ld-dropout-correct."""

    clean(args, ['.doc.tbc', '.doc.tbc.json'])

    cmd = [build_dir + '/tools/ld-dropout-correct/ld-dropout-correct']
    cmd += ['--overcorrect', args.output + '.tbc', args.output + '.doc.tbc']
    run_command(cmd)

def run_ld_chroma_decoder(args, decoder):
    """Run ld-chroma-decoder with a given decoder."""

    clean(args, ['.rgb'])
    rgb_file = args.output + '.rgb'

    cmd = [build_dir + '/tools/ld-chroma-decoder/ld-chroma-decoder']
    if decoder is not None:
        cmd += ['--decoder', decoder]
    cmd += [args.output + '.doc.tbc', rgb_file]
    run_command(cmd)

    # Check there are enough output frames
    if (args.expect_frames is not None) and (not dry_run):
        if not os.path.exists(rgb_file):
            die(rgb_file, 'does not exist')
        if args.pal:
            frame_w, frame_h = 928, 576
        else:
            frame_w, frame_h = 760, 488
        rgb_frames = os.stat(rgb_file).st_size // (2 * 3 * frame_w * frame_h)
        if rgb_frames < args.expect_frames:
            die(rgb_file, 'contains', rgb_frames,
                'frames; expected at least', args.expect_frames)

def parse_list_arg(s, num):
    """Parse an argument with a comma-separated list of values."""
    values = s.split(",")
    if len(values) != num:
        raise ValueError(f"Argument must have {num} values")
    return [int(value) for value in values]

def parse_vbi_arg(s):
    return parse_list_arg(s, 3)

def parse_vitc_arg(s):
    return parse_list_arg(s, 8)

def main():
    parser = argparse.ArgumentParser(description='Run the decoding toolchain against an RF sample')
    group = parser.add_argument_group("Decoding")
    group.add_argument('infile', metavar='infile',
                       help='RF source file')
    group.add_argument('output', metavar='output', nargs='?', default='testout/test',
                       help='base name for output files (default testout/test)')
    group.add_argument('-n', '--dry-run', action='store_true',
                       help='show commands, rather than running them')
    group.add_argument('--source', metavar='DIR',
                       help='source tree to test (default same as this script)')
    group.add_argument('--build', metavar='DIR',
                       help='build tree to test (default source tree)')
    group.add_argument('--pal', action='store_true',
                       help='source is PAL (default NTSC)')
    group.add_argument('--no-efm', action='store_true', dest='no_efm',
                       help='source has no EFM')
    group.add_argument('--AC3', action='store_true', dest='AC3',
                       help='source has AC3 audio')
    group.add_argument('--cut-seek', dest='cutseek', type=int,
                       help='start ld-cut by seeking to frame #')
    group.add_argument('--cut-start', dest='cutstart', type=int,
                       help='start ld-cut by seeking to frame #')
    group.add_argument('--cut-length', dest='cutlength', type=int, default = 2,
                       help='get N frames from ld-cut')
    group.add_argument('--decoder', metavar='decoder', action='append',
                       dest='decoders', default=[],
                       help='use specific ld-chroma-decoder decoder '
                            '(use more than once to test multiple decoders)')
    group = parser.add_argument_group("Sanity checks")
    group.add_argument('--expect-frames', metavar='N', type=int,
                       help='expect at least N frames of video output')
    group.add_argument('--expect-bpsnr', metavar='DB', type=float,
                       help='expect median bPSNR of at least DB')
    group.add_argument('--expect-vbi', metavar='N,N,N', type=parse_vbi_arg,
                       help='expect at least one field with VBI values N,N,N')
    group.add_argument('--print-vbi', action='store_true',
                       help='Print VBI values (to find out what to pass into expect-vbi)')
    group.add_argument('--expect-vitc', metavar='N,N,N,N,N,N,N,N', type=parse_vitc_arg,
                       help='expect at least one field with VITC values N,N,N,N,N,N,N,N')
    group.add_argument('--print-vitc', action='store_true',
                       help='Print VITC values (to find out what to pass into expect-vitc)')
    group.add_argument('--expect-efm-samples', metavar='N', type=int,
                       help='expect at least N stereo pairs of samples in EFM output')
    group.add_argument('--expect-AC3-file-size', dest='expect_ac3_bytes', metavar='N', type=int,
                       help='expect at least N bytes in EFM output')
    args = parser.parse_args()

    global dry_run
    dry_run = args.dry_run
    if args.decoders == []:
        args.decoders = [None]

    if args.infile.startswith("testdata/") and not os.path.exists(args.infile):
        print('Input file', args.infile, 'not present, skipping test')
        return

    # Find the top-level source directory
    prog_path = os.path.realpath(sys.argv[0])
    global source_dir, build_dir
    source_dir = os.path.dirname(os.path.dirname(prog_path))
    if args.source:
        source_dir = args.source
    build_dir = source_dir
    if args.build:
        build_dir = args.build

    print('Decoding', args.infile, 'using tools from', build_dir, file=sys.stderr)

    # Remove display environment variables, as the decoding tools shouldn't
    # depend on having a display
    for var in ('DISPLAY', 'WAYLAND_DISPLAY'):
        if var in os.environ:
            del os.environ[var]

    # Ensure the directory containing output files exists
    output_dir = os.path.dirname(args.output)
    if output_dir != '':
        os.makedirs(output_dir, exist_ok=True)

    # Produce a trimmed input file using ld-cut, if requested
    orig_infile = args.infile
    if args.cutseek is not None or args.cutstart is not None:
        run_ld_cut(args)

    # Run the stages of the decoding toolchain
    run_ld_decode(args)
    run_ld_process_vbi(args)
    run_ld_export_metadata(args)
    run_ld_process_efm(args)
    run_ld_dropout_correct(args)
    for decoder in args.decoders:
        run_ld_chroma_decoder(args, decoder)

    print('\nDecoding', orig_infile, 'completed successfully')

if __name__ == '__main__':
    main()
